Skip to main content

Overview

The EDL Pipeline uses aggressive multi-threading to fetch data for ~2,775 stocks efficiently. Each script is optimized with specific thread counts and timeout values based on endpoint characteristics and observed rate limits.
Improper threading configuration can lead to API rate limits (HTTP 429), connection timeouts, or IP blocking. Follow the configurations documented here.

Thread Configuration by Endpoint

Each script uses different thread counts optimized for the specific endpoint:
ScriptThreadsTimeoutRationale
fetch_company_filings.py2010sDual-endpoint fetches (2 requests per stock)
fetch_new_announcements.py4010sSmall payloads, fast responses
fetch_advanced_indicators.py5010sLightweight data, high throughput
fetch_market_news.py1510sRate-sensitive endpoint (429 errors observed)
fetch_all_ohlcv.py1515sLarge historical data, chunked requests
fetch_fundamental_data.pyN/A30sSequential batching (100 ISINs per batch)
Thread counts were determined through production testing. Increasing threads beyond these values results in rate limiting or connection errors.

Detailed Thread Analysis

High Concurrency (40-50 threads)

Scripts:
  • fetch_new_announcements.py - 40 threads
  • fetch_advanced_indicators.py - 50 threads
Why it works:
  • Small request/response payloads
  • Stateless endpoints
  • Fast API response times (under 500ms)
  • No observed rate limiting
Configuration:
MAX_THREADS = 50  # Fast parallel execution

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    future_to_stock = {
        executor.submit(fetch_indicators, stock): stock 
        for stock in master_list
    }
    
    for future in as_completed(future_to_stock):
        res = future.result()
        # Process result
Performance:
  • Completes 2,775 stocks in ~3-5 minutes
  • Success rate: >99%
  • Minimal retries needed

Medium Concurrency (15-20 threads)

Scripts:
  • fetch_company_filings.py - 20 threads
  • fetch_market_news.py - 15 threads
  • fetch_all_ohlcv.py - 15 threads
Why limited:
  • Larger response payloads
  • Multiple requests per stock (filings: 2 endpoints)
  • Rate-sensitive APIs (news endpoint)
  • Historical data chunking (OHLCV)
Configuration:
MAX_THREADS = 20  # Fast with 20 threads

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    future_to_stock = {
        executor.submit(fetch_filings, item): item["Symbol"] 
        for item in stock_list
    }
    
    count = 0
    for future in as_completed(future_to_stock):
        count += 1
        res = future.result()
        
        if count % 100 == 0:
            print(f"[{count}/{total}] completed")
Company Filings Specific: Makes 2 API calls per stock (hybrid fetch):
# Endpoint 1: Legacy filings
url1 = "https://ow-static-scanx.dhan.co/staticscanx/company_filings"
res1 = requests.post(url1, json=payload1, headers=headers, timeout=10)

# Endpoint 2: LODR filings
url2 = "https://ow-static-scanx.dhan.co/staticscanx/lodr"
res2 = requests.post(url2, json=payload2, headers=headers, timeout=10)

# Merge and deduplicate results
With 20 threads and 2 requests each, actual concurrent requests ≈ 40. Performance:
  • Filings: ~8-12 minutes for 2,775 stocks
  • News: ~10-15 minutes for 2,775 stocks
  • OHLCV: ~30 minutes for 2,775 stocks (chunked historical data)

Sequential with Batching

Script:
  • fetch_fundamental_data.py
Why sequential:
  • Endpoint supports batch requests (100 ISINs per call)
  • More efficient than parallel single-ISIN requests
  • Prevents overwhelming the endpoint
Configuration:
batch_size = 100

for i in range(0, total_isins, batch_size):
    batch_isins = all_isins[i:i + batch_size]
    
    payload = {
        "data": {
            "isins": batch_isins  # 100 ISINs at once
        }
    }
    
    response = requests.post(api_url, json=payload, headers=headers, timeout=30)
    
    # Be polite to the server
    time.sleep(0.5)  # 500ms delay between batches
Performance:
  • Completes 2,775 stocks in ~15-20 batches
  • Total time: ~2-3 minutes
  • Success rate: >99.5%
Batching is 10x faster than parallel single-ISIN requests for this endpoint.

Timeout Configuration

Timeout Values

TimeoutUse CaseScripts
10sStandard API callsMost fetch scripts
15sLarge responses (OHLCV, NSE CSVs)fetch_all_ohlcv.py, fetch_complete_price_bands.py
30sBatch fundamental datafetch_fundamental_data.py

Why Timeouts Matter

# Without timeout - can hang indefinitely
response = requests.post(url, json=payload)  # BAD

# With timeout - fails fast
response = requests.post(url, json=payload, timeout=10)  # GOOD
Timeout failure handling:
try:
    response = requests.post(api_url, json=payload, headers=headers, timeout=10)
    if response.status_code == 200:
        # Process response
        return "success"
except requests.Timeout:
    return "timeout"
except Exception as e:
    return "error"

Rate Limit Handling

HTTP 429 Detection

Only the Market News API shows rate limiting behavior:
if response.status_code == 429:
    time.sleep(2)  # 2-second backoff
    return "rate_limit"
Why 429 occurs:
  • Too many requests from same IP in short window
  • Burst traffic during parallel execution
Mitigation:
  • Limited to 15 threads (not 40-50 like other endpoints)
  • 2-second backoff on 429 response
  • Automatic retry in next run

Exponential Backoff (Best Practice)

For production implementations, consider exponential backoff:
import time

def fetch_with_retry(url, payload, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = requests.post(url, json=payload, timeout=10)
            
            if response.status_code == 200:
                return response.json()
            
            if response.status_code == 429:
                # Exponential backoff: 2s, 4s, 8s
                wait_time = 2 ** attempt
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
                continue
            
            return None
            
        except requests.Timeout:
            if attempt < max_retries - 1:
                time.sleep(1)
                continue
            return None
    
    return None

Batch Delays

Inter-batch Sleep

Fundamental Data:
time.sleep(0.5)  # 500ms delay between batches
Why needed:
  • Be polite to server
  • Prevent triggering rate limits
  • Smooth out request spikes
Effect:
  • Adds ~10-15 seconds total to runtime
  • Prevents API throttling
  • Improves overall reliability

No Delays for Threaded Scripts

Threaded scripts (announcements, indicators, etc.) don’t use inter-request delays because:
  • ThreadPoolExecutor naturally staggers requests
  • Individual timeouts provide natural pacing
  • No rate limiting observed at current thread counts

Progress Monitoring

Real-time Progress Updates

All multi-threaded scripts show progress:
if count % 100 == 0 or count == total:
    elapsed = time.time() - start_time
    print(f"[{count}/{total}] | Success: {success_count} | "
          f"Errors: {error_count} | Elapsed: {elapsed:.1f}s")
Example output:
[100/2775] | Success: 98 | Errors: 2 | Elapsed: 45.2s
[200/2775] | Success: 197 | Errors: 3 | Elapsed: 89.7s
[300/2775] | Success: 295 | Errors: 5 | Elapsed: 134.1s

Retry Strategies

Implicit Retry (Re-run Script)

Most scripts use a simple retry strategy:
# Skip if file already exists and has data
if os.path.exists(output_path) and os.path.getsize(output_path) > 10:
    return "skipped"
To force refresh:
FORCE_UPDATE = True  # Set in script configuration

if os.path.exists(output_path) and not FORCE_UPDATE:
    return "skipped"

No Automatic Retry

Scripts do NOT automatically retry failed requests within the same run because:
  • Keeps code simple and predictable
  • Failures are often due to missing data (not transient errors)
  • Re-running the script is sufficient for transient failures
Failed requests handling:
try:
    response = requests.post(api_url, json=payload, timeout=10)
    if response.status_code == 200:
        # Process and save
        return "success"
    else:
        return f"http_{response.status_code}"
except Exception as e:
    return "error"
Failed stocks are logged but not retried. Re-run the script to retry failures.

Best Practices

Use documented thread counts - these are optimized through production testing.
Always set timeouts - prevents hanging on slow/dead connections.
Monitor progress - print updates every 50-100 stocks to track execution.
Handle 429 gracefully - implement backoff for rate-sensitive endpoints.
Batch when possible - 100 ISINs per batch is 10x faster than individual requests.
Add inter-batch delays - 500ms sleep prevents triggering rate limits.
Skip existing files - avoid re-fetching unless FORCE_UPDATE is enabled.
Log all failures - track error counts to identify systemic issues.

Performance Benchmarks

Pipeline Completion Times

ScriptThread CountStocksAvg TimeReq/Second
Advanced Indicators502,7753-5 min~9-15
Announcements402,7754-6 min~8-12
Company Filings202,7758-12 min~4-6
Market News152,77510-15 min~3-5
OHLCV (Historical)152,77530-40 min~1-2
Fundamental DataBatch2,7752-3 min~15-20
Full pipeline (16 scripts): ~4 minutes (excluding optional OHLCV) With lifetime OHLCV: ~35 minutes total

Troubleshooting

Too Many Timeouts

Symptoms:
  • High error count
  • “Timeout” messages in logs
Solutions:
  1. Reduce thread count by 25-50%
  2. Increase timeout value (10s → 15s)
  3. Check network connectivity

HTTP 429 Rate Limits

Symptoms:
  • “rate_limit” in error messages
  • Sudden spike in failures
Solutions:
  1. Reduce thread count (e.g., 40 → 20)
  2. Add delays between requests
  3. Implement exponential backoff
  4. Wait 5-10 minutes before retrying

Incomplete Data

Symptoms:
  • Success count < total stocks
  • Missing JSON files for some symbols
Solutions:
  1. Check error_count in final report
  2. Re-run script (skips existing files)
  3. Enable FORCE_UPDATE to refresh all
  4. Check if endpoint is down for specific stocks

Memory Issues

Symptoms:
  • Script crashes on large datasets
  • “MemoryError” exceptions
Solutions:
  1. Reduce thread count to limit concurrent memory usage
  2. Process in smaller batches
  3. Free memory between batches
import gc

for batch in batches:
    process_batch(batch)
    gc.collect()  # Force garbage collection

Optimization Tips

Incremental Updates: Use file existence checks to skip already-fetched data:
if os.path.exists(output_path) and os.path.getsize(output_path) > 10:
    return "skipped"
Connection Pooling: Reuse HTTP sessions for better performance:
session = requests.Session()
response = session.post(url, json=payload, timeout=10)
Batch Deduplication: For hybrid endpoints (company filings), deduplicate in memory before saving:
unique_map = {}
for entry in combined:
    key = entry.get("news_id") or f"{entry['date']}_{entry['caption']}"
    if key not in unique_map:
        unique_map[key] = entry
Monitor System Resources: Watch CPU and memory during execution to fine-tune thread counts.